MLP

2.1 数据集的准备

由于要随机生成三类每类包含20个样本点的数据,我将随机选取三个三维的坐标,然后以它们作为中心,分别生成三个高斯分布的样本群,每个样本群有20个样本。

import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
np.random.seed(50)

# 随机生成三类数据,每类20个样本,数据维度为3, 并设置分别的中心点
class1 = np.random.randn(20, 3) + [1, 1, 1]
class2 = np.random.randn(20, 3) + [4, 2, 1]
class3 = np.random.randn(20, 3) + [5, 2, 9]

X = np.vstack((class1, class2, class3)) # 按行堆叠数据
y = np.array([0]*20 + [1]*20 + [2]*20)    # 指定每个样本的类别标签

# 划分训练验证集
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=50)

# 独热编码 
encoder = OneHotEncoder(sparse=False)
y_train_onehot = encoder.fit_transform(y_train.reshape(-1, 1)) 
y_test_onehot = encoder.transform(y_test.reshape(-1, 1))

2.2 BP神经网络-MLP多层感知机构建

现在我们开始实现最简单的BP神经网络:MLP多层感知机。由于MLP的前馈结构,我们可以将其拆分成许多层,每一层都应该具备3个基本的功能,它们分别是:

  1. 根据(上一层的)输入计算输出
  2. 计算参数的梯度
  3. 更新层参数

激活函数可以单独抽象为单独的一层,不具有参数。

2.2.1 基类层设计

为了形式统一,我们先定义基类,再让不同的层都继承于基类。

# 基类层
class Layer:
	# 前向传播函数,根据上一层输入x计算
	def forward(self, x):
		raise NotImplementedError # 未实现错误

	# 反向传播函数,输入下一层回传的梯度grad, 输出当前层的梯度
	def backward(self, grad):
		raise NotImplementedError

	# 更新函数,用于更新当前层的参数
	def update(self, learning_rate):
		pass

2.2.2 线性层设计

线性层是MLP中最基本的结构之一,其参数为 ( W ) 和 ( b ),输入和输出关系为: 由于其结构相当于将前后的神经元两两连接,所以也叫全连接层。

为什么 x 的维度有一个 batch_size

在神经网络训练中,通常会将多个输入样本打包成一个批次(batch)进行处理。这种方法被称为批量处理。这样做有以下几个好处:

  1. 提高计算效率:使用向量化操作处理多个样本比逐个处理单个样本要快得多。
  2. 稳定梯度下降:在每次迭代中使用一个批次的样本计算梯度,可以平滑梯度的波动,从而使训练过程更稳定。
class Linear(Layer):
	def __init__(self, num_in, num_out, use_bias=True):
		self.num_in = num_in # 输入维度
		self.num_out = num_out # 输出维度
		self.use_bias = use_bias # 是否添加偏置
	
		# 参数的初始化(绝对不能初始化为0!不然后续计算失去意义)
		# 用正态分布来初始化W
		self.W = np.random.normal(loc=0, scale=1.0, size=(num_out, num_in))
		if use_bias:
			self.b = np.zeros((1, num_out))

	def forward(self, x):
		# 前向传播 y = Wx + b
		# x的维度为(batch_size, num_in)
		self.x = x
		self.y = x @ self.W.T # y的维度为(batch_size, num_out)
		if self.use_bias:
			self.y += self.b
		return self.y

	def backward(self, grad):
		# 反向传播,按照链式法则计算
		# grad的维度为(batch_size, num_out)
		# 梯度应该对batch_size去平均值
		# grad_W的维度应该与W相同,为(num_in, num_out)
		self.grad_W = self.x.T @ grad / grad.shape[0]
		if self.use_bias:
			# grad_b的维度与b相同,(1, num_out)
			self.grad_b = np.mean(grad, axis=0, keepdims=True) # 对 grad 沿批次维度(行)取平均值,并保留维度信息以确保结果形状与偏置向量 b 一致。
		# 往上一层传递的grad维度应该为(batch_size, num_in)
		grad = grad @ self.W
		return grad

	def update(self, learning_rate):
		# 更新参数以完成梯度下降
		self.W -= learning_rate * self.grad_W
		if self.use_bias:
			self.b -= learning_rate * self.grad_b

2.2.3 激活层设计

现在我们来实现激活层的设计,以计算反向传播,这里主要实现三种激活层:

  1. Sigmoid激活层
  2. Tanh激活层
  3. ReLU激活层
  4. 啥都不动层
  5. Softmax激活层:用于多分类问题的最后一层激活,归一化所有概率
class Identity(Layer):
	# 啥都不动层
	def forward(self, x):
		return x
	def backward(self, grad):
		return grad

class Sigmoid(Layer):
	# Sigmoid激活层
	def forward(self, x):
		self.x = x
		self.y = 1 / (1 + np.exp(-x))
		return self.y

	def backward(self, grad):
		return grad * self.y * (1 - self.y)

class Tanh(Layer):
	# Tanh激活层
	def forward(self, x):
		self.x = x
		self.y = np.tanh(x)
		return self.y

	def backward(self, grad):
		return grad * (1 - self.y ** 2)

class ReLU(Layer):
	# Relu激活层
	def forward(self, x):
		self.x = x
		self.y = np.maximum(x, 0)
		return self.y

	def backward(self, grad):
		return grad * (self.x >= 0)

class Softmax(Layer):
	def forward(self, x):
		exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
		self.y = exp_x / np.sum(exp_x, axis=1, keepdims=True)
		return self.y
	
	def backward(self, grad):
		return grad

# 存储所有激活函数和对应名称,方便索引
activation_dict = {
	'identity': Identity,
	'sigmoid': Sigmoid,
	'tanh': Tanh,
	'relu': ReLU,
	'softmax': Softmax
}	 

2.2.4 MLP搭建

现在我们已经有了用来构建MLP的所有层,现在把它们拼起来即可得到MLP。

class Linear(Layer):
    def __init__(self, num_in, num_out, use_bias=True):
        self.num_in = num_in  # 输入维度
        self.num_out = num_out  # 输出维度
        self.use_bias = use_bias  # 是否添加偏置

        # 参数的初始化(绝对不能初始化为0!不然后续计算失去意义)
        # 用正态分布来初始化W
        self.W = np.random.normal(loc=0, scale=1.0, size=(num_in, num_out))
        if use_bias:
            self.b = np.zeros((1, num_out))

    def forward(self, x):
        # 前向传播 y = xW + b
        # x的维度为(batch_size, num_in)
        self.x = x
        self.y = x @ self.W  # y的维度为(batch_size, num_out)
        if self.use_bias:
            self.y += self.b
        return self.y

    def backward(self, grad):
        # 反向传播,按照链式法则计算
        # grad的维度为(batch_size, num_out)
        # 梯度应该对batch_size去平均值
        # grad_W的维度应该与W相同,为(num_in, num_out)
        self.grad_W = self.x.T @ grad / grad.shape[0]
        if self.use_bias:
            # grad_b的维度与b相同,(1, num_out)
            self.grad_b = np.mean(grad, axis=0, keepdims=True)
        # 往上一层传递的grad维度应该为(batch_size, num_in)
        grad = grad @ self.W.T
        return grad

    def update(self, learning_rate):
        # 更新参数以完成梯度下降
        self.W -= learning_rate * self.grad_W
        if self.use_bias:
            self.b -= learning_rate * self.grad_b

2.3 MLP对数据集进行分类

MLP构建好之后,我们就可以开始进行训练了。由于输入维度为3,输出应该有三个类别

,因此输入和输出神经元都应该有3个。

随机梯度下降算法(SGD)

在机器学习中,随机梯度下降(Stochastic Gradient Descent, SGD)是一种非常常见的优化算法。与标准的梯度下降算法不同,SGD在每次迭代中使用一个或几个随机选取的样本来计算梯度,而不是使用全部训练数据。这样做的好处包括:

  1. 提高计算效率:对于大型数据集,使用全数据集计算梯度可能会非常耗时,而SGD只需要使用少量样本。
  2. 更好的泛化性能:由于每次迭代使用的样本不同,SGD的更新过程具有更多的随机性,这可以帮助模型跳出局部最优,获得更好的泛化性能。

下面是具体的训练过程:

import numpy as np
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt

# 设置训练参数
num_epochs = 3000
learning_rate = 0.05
batch_size = 20
eps = 1e-7 # 用于防止除0,log(0)等问题

# 创建一个层大小依次为3, 8, 3的多层感知机
# 对于多分类问题,使用softmax作为输出层的激活函数
mlp = MLP(layer_sizes=[3, 8, 3], use_bias=True, activation='relu', out_activation='softmax')

# 记录损失和准确率 
train_losses = [] 
test_losses = [] 
train_accuracies = [] 
test_accuracies = []

for epoch in range(num_epochs):
    st = 0
	loss = 0.0
	while st < len(X_train):
		ed = min(st + batch_size, len(X_train))
		# 取出batch
		x_batch = X_train[st:ed]
		y_batch = y_train_onehot[st:ed]
		# 计算MLP的预测
		y_pred = mlp.forward(x_batch)
		# 计算损失
		batch_loss = -np.sum(np.log(y_pred + eps) * y_batch) / y_batch.shape[0]
		loss += batch_loss
		# 计算梯度并进行反向传播
		grad = y_pred - y_batch
		mlp.backward(grad)
		# 更新参数
		mlp.update(learning_rate)
		st = ed
	loss /= (len(X_train) / batch_size)
	train_losses.append(loss)
	# 计算训练准确率
	train_acc = np.mean(np.argmax(mlp.forward(X_train), axis=1) == y_train)
	train_accuracies.append(train_acc)
	# 计算测试损失和准确率
	test_loss = -np.sum(np.log(mlp.forward(X_test) + eps) * y_test_onehot) / y_test_onehot.shape[0]
	test_losses.append(test_loss)
	test_acc = np.mean(np.argmax(mlp.forward(X_test), axis=1) == y_test)
	test_accuracies.append(test_acc)
	if epoch % 100 == 0:
		print(f'Epoch {epoch}, Train Loss: {loss:.4f}, Test Loss: {test_loss:.4f}, Train Acc: {train_acc:.4f}, Test Acc: {test_acc:.4f}')

# 可视化训练和测试的损失与准确率
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(train_losses, label='Train Loss')
plt.plot(test_losses, label='Test Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss over Epochs')
plt.legend()

plt.subplot(1, 2, 2)
plt.plot(train_accuracies, label='Train Accuracy')
plt.plot(test_accuracies, label='Test Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.title('Accuracy over Epochs')
plt.legend()

plt.show()

解释

  • 数据准备:生成三类数据,每类20个样本,并将其标签独热编码。
  • MLP构建:通过线性层和激活层构建MLP模型。
  • SGD训练:使用随机梯度下降算法进行训练,并在每个epoch记录训练和测试的损失及准确率。
  • 可视化:使用Matplotlib库将训练和测试的损失与准确率可视化。

这样就实现了一个简单的MLP用于分类任务,并且可以通过训练结果的可视化来观察模型的性能。